/*
 * Copyright 2012 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License,
 * version 2.0 (the "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at:
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */
package org.jboss.netty.channel.socket.nio;

import java.io.IOException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.ConcurrentModificationException;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.util.ThreadNameDeterminer;
import org.jboss.netty.util.ThreadRenamingRunnable;
import org.jboss.netty.util.internal.DeadLockProofWorker;

abstract class AbstractNioSelector implements NioSelector {

	private static final AtomicInteger nextId = new AtomicInteger();

	private final int id = nextId.incrementAndGet();

	private static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value,
														// but won't need
														// customization.

	/**
	 * Executor used to execute {@link Runnable}s such as channel registration
	 * task.
	 */
	private final Executor executor;

	/**
	 * If this worker has been started thread will be a reference to the thread
	 * used when starting. i.e. the current thread when the run method is
	 * executed.
	 */
	protected volatile Thread thread;

	/**
	 * The NIO {@link Selector}.
	 */
	protected volatile Selector selector;

	/**
	 * Boolean that controls determines if a blocked Selector.select should
	 * break out of its selection process. In our case we use a timeone for the
	 * select method and the select method will block for that time unless waken
	 * up.
	 */
	protected final AtomicBoolean wakenUp = new AtomicBoolean();

	private final Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<Runnable>();

	private volatile int cancelledKeys; // should use AtomicInteger but we just
										// need approximation

	private final CountDownLatch shutdownLatch = new CountDownLatch(1);
	private volatile boolean shutdown;

	AbstractNioSelector(Executor executor) {
		this(executor, null);
	}

	AbstractNioSelector(Executor executor, ThreadNameDeterminer determiner) {
		this.executor = executor;
		openSelector(determiner);
	}

	public void register(Channel channel, ChannelFuture future) {
		Runnable task = createRegisterTask(channel, future);
		registerTask(task);
	}

	protected final void registerTask(Runnable task) {
		taskQueue.add(task);

		Selector selector = this.selector;

		if (selector != null) {
			if (wakenUp.compareAndSet(false, true)) {
				selector.wakeup();
			}
		} else {
			if (taskQueue.remove(task)) {
				// the selector was null this means the Worker has already been
				// shutdown.
				throw new RejectedExecutionException("Worker has already been shutdown");
			}
		}
	}

	protected final boolean isIoThread() {
		return Thread.currentThread() == thread;
	}

	public void rebuildSelector() {
		if (!isIoThread()) {
			taskQueue.add(new Runnable() {
				public void run() {
					rebuildSelector();
				}
			});
			return;
		}

		final Selector oldSelector = selector;
		final Selector newSelector;

		if (oldSelector == null) {
			return;
		}

		try {
			newSelector = Selector.open();
		} catch (Exception e) {
			e.printStackTrace();
			return;
		}

		// Register all channels to the new Selector.
		for (;;) {
			try {
				for (SelectionKey key : oldSelector.keys()) {
					try {
						if (key.channel().keyFor(newSelector) != null) {
							continue;
						}

						int interestOps = key.interestOps();
						key.cancel();
						key.channel().register(newSelector, interestOps, key.attachment());
					} catch (Exception e) {
						close(key);
					}
				}
			} catch (ConcurrentModificationException e) {
				// Probably due to concurrent modification of the key set.
				continue;
			}

			break;
		}

		selector = newSelector;

		try {
			// time to close the old selector as everything else is registered
			// to the new one
			oldSelector.close();
		} catch (Throwable t) {
			t.printStackTrace();
		}

		// logger.info("Migrated " + nChannels +
		// " channel(s) to the new Selector,");
	}

	public void run() {
		thread = Thread.currentThread();

		int selectReturnsImmediately = 0;
		Selector selector = this.selector;

		if (selector == null) {
			return;
		}
		// use 80% of the timeout for measure
		final long minSelectTimeout = SelectorUtil.SELECT_TIMEOUT_NANOS * 80 / 100;
		boolean wakenupFromLoop = false;
		for (;;) {
			wakenUp.set(false);

			try {
				long beforeSelect = System.nanoTime();
				int selected = select(selector);
				if (SelectorUtil.EPOLL_BUG_WORKAROUND && selected == 0 && !wakenupFromLoop && !wakenUp.get()) {
					long timeBlocked = System.nanoTime() - beforeSelect;

					if (timeBlocked < minSelectTimeout) {
						boolean notConnected = false;
						// loop over all keys as the selector may was unblocked
						// because of a closed channel
						for (SelectionKey key : selector.keys()) {
							SelectableChannel ch = key.channel();
							try {
								if (ch instanceof DatagramChannel && !((DatagramChannel) ch).isConnected() || ch instanceof SocketChannel
										&& !((SocketChannel) ch).isConnected()) {
									notConnected = true;
									// cancel the key just to be on the safe
									// side
									key.cancel();
								}
							} catch (CancelledKeyException e) {
								// ignore
							}
						}
						if (notConnected) {
							selectReturnsImmediately = 0;
						} else {
							// returned before the minSelectTimeout elapsed with
							// nothing select.
							// this may be the cause of the jdk epoll(..) bug,
							// so increment the counter
							// which we use later to see if its really the jdk
							// bug.
							selectReturnsImmediately++;
						}
					} else {
						selectReturnsImmediately = 0;
					}

					if (selectReturnsImmediately == 1024) {
						// The selector returned immediately for 10 times in a
						// row,
						// so recreate one selector as it seems like we hit the
						// famous epoll(..) jdk bug.
						rebuildSelector();
						selector = this.selector;
						selectReturnsImmediately = 0;
						wakenupFromLoop = false;
						// try to select again
						continue;
					}
				} else {
					// reset counter
					selectReturnsImmediately = 0;
				}

				// 'wakenUp.compareAndSet(false, true)' is always evaluated
				// before calling 'selector.wakeup()' to reduce the wake-up
				// overhead. (Selector.wakeup() is an expensive operation.)
				//
				// However, there is a race condition in this approach.
				// The race condition is triggered when 'wakenUp' is set to
				// true too early.
				//
				// 'wakenUp' is set to true too early if:
				// 1) Selector is waken up between 'wakenUp.set(false)' and
				// 'selector.select(...)'. (BAD)
				// 2) Selector is waken up between 'selector.select(...)' and
				// 'if (wakenUp.get()) { ... }'. (OK)
				//
				// In the first case, 'wakenUp' is set to true and the
				// following 'selector.select(...)' will wake up immediately.
				// Until 'wakenUp' is set to false again in the next round,
				// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
				// any attempt to wake up the Selector will fail, too, causing
				// the following 'selector.select(...)' call to block
				// unnecessarily.
				//
				// To fix this problem, we wake up the selector again if wakenUp
				// is true immediately after selector.select(...).
				// It is inefficient in that it wakes up the selector for both
				// the first case (BAD - wake-up required) and the second case
				// (OK - no wake-up required).

				if (wakenUp.get()) {
					wakenupFromLoop = true;
					selector.wakeup();
				} else {
					wakenupFromLoop = false;
				}

				cancelledKeys = 0;
				processTaskQueue();
				selector = this.selector; // processTaskQueue() can call
											// rebuildSelector()

				if (shutdown) {
					this.selector = null;

					// process one time again
					processTaskQueue();

					for (SelectionKey k : selector.keys()) {
						close(k);
					}

					try {
						selector.close();
					} catch (IOException e) {
						e.printStackTrace();
					}
					shutdownLatch.countDown();
					break;
				} else {
					process(selector);
				}
			} catch (Throwable t) {
				t.printStackTrace();
				try {
					Thread.sleep(1000);
				} catch (InterruptedException e) {
					// Ignore.
				}
			}
		}
	}

	/**
	 * Start the {@link AbstractNioWorker} and return the {@link Selector} that
	 * will be used for the {@link AbstractNioChannel}'s when they get
	 * registered
	 */
	private void openSelector(ThreadNameDeterminer determiner) {
		try {
			selector = Selector.open();
		} catch (Throwable t) {
			throw new ChannelException("Failed to create a selector.", t);
		}

		// Start the worker thread with the new Selector.
		boolean success = false;
		try {
			DeadLockProofWorker.start(executor, newThreadRenamingRunnable(id, determiner));
			success = true;
		} finally {
			if (!success) {
				// Release the Selector if the execution fails.
				try {
					selector.close();
				} catch (Throwable e) {
					e.printStackTrace();
				}
				selector = null;
				// The method will return to the caller at this point.
			}
		}
		assert selector != null && selector.isOpen();
	}

	private void processTaskQueue() {
		for (;;) {
			final Runnable task = taskQueue.poll();
			if (task == null) {
				break;
			}
			task.run();
			try {
				cleanUpCancelledKeys();
			} catch (IOException e) {
				// Ignore
			}
		}
	}

	protected final void increaseCancelledKeys() {
		cancelledKeys++;
	}

	protected final boolean cleanUpCancelledKeys() throws IOException {
		if (cancelledKeys >= CLEANUP_INTERVAL) {
			cancelledKeys = 0;
			selector.selectNow();
			return true;
		}
		return false;
	}

	public void shutdown() {
		if (isIoThread()) {
			throw new IllegalStateException("Must not be called from a I/O-Thread to prevent deadlocks!");
		}

		Selector selector = this.selector;
		shutdown = true;
		if (selector != null) {
			selector.wakeup();
		}
		try {
			shutdownLatch.await();
		} catch (InterruptedException e) {
			e.printStackTrace();
			Thread.currentThread().interrupt();
		}
	}

	protected abstract void process(Selector selector) throws IOException;

	protected int select(Selector selector) throws IOException {
		return SelectorUtil.select(selector);
	}

	protected abstract void close(SelectionKey k);

	protected abstract ThreadRenamingRunnable newThreadRenamingRunnable(int id, ThreadNameDeterminer determiner);

	protected abstract Runnable createRegisterTask(Channel channel, ChannelFuture future);
}
